home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2009 February / PCWFEB09.iso / Software / Linux / Kubuntu 8.10 / kubuntu-8.10-desktop-i386.iso / casper / filesystem.squashfs / usr / lib / python2.5 / threading.py < prev    next >
Text File  |  2008-10-05  |  29KB  |  892 lines

  1. """Thread module emulating a subset of Java's threading model."""
  2.  
  3. import sys as _sys
  4.  
  5. try:
  6.     import thread
  7. except ImportError:
  8.     del _sys.modules[__name__]
  9.     raise
  10.  
  11. from time import time as _time, sleep as _sleep
  12. from traceback import format_exc as _format_exc
  13. from collections import deque
  14.  
  15. # Rename some stuff so "from threading import *" is safe
  16. __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
  17.            'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
  18.            'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
  19.  
  20. _start_new_thread = thread.start_new_thread
  21. _allocate_lock = thread.allocate_lock
  22. _get_ident = thread.get_ident
  23. ThreadError = thread.error
  24. del thread
  25.  
  26.  
  27. # Debug support (adapted from ihooks.py).
  28. # All the major classes here derive from _Verbose.  We force that to
  29. # be a new-style class so that all the major classes here are new-style.
  30. # This helps debugging (type(instance) is more revealing for instances
  31. # of new-style classes).
  32.  
  33. _VERBOSE = False
  34.  
  35. if __debug__:
  36.  
  37.     class _Verbose(object):
  38.  
  39.         def __init__(self, verbose=None):
  40.             if verbose is None:
  41.                 verbose = _VERBOSE
  42.             self.__verbose = verbose
  43.  
  44.         def _note(self, format, *args):
  45.             if self.__verbose:
  46.                 format = format % args
  47.                 format = "%s: %s\n" % (
  48.                     currentThread().getName(), format)
  49.                 _sys.stderr.write(format)
  50.  
  51. else:
  52.     # Disable this when using "python -O"
  53.     class _Verbose(object):
  54.         def __init__(self, verbose=None):
  55.             pass
  56.         def _note(self, *args):
  57.             pass
  58.  
  59. # Support for profile and trace hooks
  60.  
  61. _profile_hook = None
  62. _trace_hook = None
  63.  
  64. def setprofile(func):
  65.     global _profile_hook
  66.     _profile_hook = func
  67.  
  68. def settrace(func):
  69.     global _trace_hook
  70.     _trace_hook = func
  71.  
  72. # Synchronization classes
  73.  
  74. Lock = _allocate_lock
  75.  
  76. def RLock(*args, **kwargs):
  77.     return _RLock(*args, **kwargs)
  78.  
  79. class _RLock(_Verbose):
  80.  
  81.     def __init__(self, verbose=None):
  82.         _Verbose.__init__(self, verbose)
  83.         self.__block = _allocate_lock()
  84.         self.__owner = None
  85.         self.__count = 0
  86.  
  87.     def __repr__(self):
  88.         owner = self.__owner
  89.         return "<%s(%s, %d)>" % (
  90.                 self.__class__.__name__,
  91.                 owner and owner.getName(),
  92.                 self.__count)
  93.  
  94.     def acquire(self, blocking=1):
  95.         me = currentThread()
  96.         if self.__owner is me:
  97.             self.__count = self.__count + 1
  98.             if __debug__:
  99.                 self._note("%s.acquire(%s): recursive success", self, blocking)
  100.             return 1
  101.         rc = self.__block.acquire(blocking)
  102.         if rc:
  103.             self.__owner = me
  104.             self.__count = 1
  105.             if __debug__:
  106.                 self._note("%s.acquire(%s): initial success", self, blocking)
  107.         else:
  108.             if __debug__:
  109.                 self._note("%s.acquire(%s): failure", self, blocking)
  110.         return rc
  111.  
  112.     __enter__ = acquire
  113.  
  114.     def release(self):
  115.         if self.__owner is not currentThread():
  116.             raise RuntimeError("cannot release un-aquired lock")
  117.         self.__count = count = self.__count - 1
  118.         if not count:
  119.             self.__owner = None
  120.             self.__block.release()
  121.             if __debug__:
  122.                 self._note("%s.release(): final release", self)
  123.         else:
  124.             if __debug__:
  125.                 self._note("%s.release(): non-final release", self)
  126.  
  127.     def __exit__(self, t, v, tb):
  128.         self.release()
  129.  
  130.     # Internal methods used by condition variables
  131.  
  132.     def _acquire_restore(self, (count, owner)):
  133.         self.__block.acquire()
  134.         self.__count = count
  135.         self.__owner = owner
  136.         if __debug__:
  137.             self._note("%s._acquire_restore()", self)
  138.  
  139.     def _release_save(self):
  140.         if __debug__:
  141.             self._note("%s._release_save()", self)
  142.         count = self.__count
  143.         self.__count = 0
  144.         owner = self.__owner
  145.         self.__owner = None
  146.         self.__block.release()
  147.         return (count, owner)
  148.  
  149.     def _is_owned(self):
  150.         return self.__owner is currentThread()
  151.  
  152.  
  153. def Condition(*args, **kwargs):
  154.     return _Condition(*args, **kwargs)
  155.  
  156. class _Condition(_Verbose):
  157.  
  158.     def __init__(self, lock=None, verbose=None):
  159.         _Verbose.__init__(self, verbose)
  160.         if lock is None:
  161.             lock = RLock()
  162.         self.__lock = lock
  163.         # Export the lock's acquire() and release() methods
  164.         self.acquire = lock.acquire
  165.         self.release = lock.release
  166.         # If the lock defines _release_save() and/or _acquire_restore(),
  167.         # these override the default implementations (which just call
  168.         # release() and acquire() on the lock).  Ditto for _is_owned().
  169.         try:
  170.             self._release_save = lock._release_save
  171.         except AttributeError:
  172.             pass
  173.         try:
  174.             self._acquire_restore = lock._acquire_restore
  175.         except AttributeError:
  176.             pass
  177.         try:
  178.             self._is_owned = lock._is_owned
  179.         except AttributeError:
  180.             pass
  181.         self.__waiters = []
  182.  
  183.     def __enter__(self):
  184.         return self.__lock.__enter__()
  185.  
  186.     def __exit__(self, *args):
  187.         return self.__lock.__exit__(*args)
  188.  
  189.     def __repr__(self):
  190.         return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
  191.  
  192.     def _release_save(self):
  193.         self.__lock.release()           # No state to save
  194.  
  195.     def _acquire_restore(self, x):
  196.         self.__lock.acquire()           # Ignore saved state
  197.  
  198.     def _is_owned(self):
  199.         # Return True if lock is owned by currentThread.
  200.         # This method is called only if __lock doesn't have _is_owned().
  201.         if self.__lock.acquire(0):
  202.             self.__lock.release()
  203.             return False
  204.         else:
  205.             return True
  206.  
  207.     def wait(self, timeout=None):
  208.         if not self._is_owned():
  209.             raise RuntimeError("cannot wait on un-aquired lock")
  210.         waiter = _allocate_lock()
  211.         waiter.acquire()
  212.         self.__waiters.append(waiter)
  213.         saved_state = self._release_save()
  214.         try:    # restore state no matter what (e.g., KeyboardInterrupt)
  215.             if timeout is None:
  216.                 waiter.acquire()
  217.                 if __debug__:
  218.                     self._note("%s.wait(): got it", self)
  219.             else:
  220.                 # Balancing act:  We can't afford a pure busy loop, so we
  221.                 # have to sleep; but if we sleep the whole timeout time,
  222.                 # we'll be unresponsive.  The scheme here sleeps very
  223.                 # little at first, longer as time goes on, but never longer
  224.                 # than 20 times per second (or the timeout time remaining).
  225.                 endtime = _time() + timeout
  226.                 delay = 0.0005 # 500 us -> initial delay of 1 ms
  227.                 while True:
  228.                     gotit = waiter.acquire(0)
  229.                     if gotit:
  230.                         break
  231.                     remaining = endtime - _time()
  232.                     if remaining <= 0:
  233.                         break
  234.                     delay = min(delay * 2, remaining, .05)
  235.                     _sleep(delay)
  236.                 if not gotit:
  237.                     if __debug__:
  238.                         self._note("%s.wait(%s): timed out", self, timeout)
  239.                     try:
  240.                         self.__waiters.remove(waiter)
  241.                     except ValueError:
  242.                         pass
  243.                 else:
  244.                     if __debug__:
  245.                         self._note("%s.wait(%s): got it", self, timeout)
  246.         finally:
  247.             self._acquire_restore(saved_state)
  248.  
  249.     def notify(self, n=1):
  250.         if not self._is_owned():
  251.             raise RuntimeError("cannot notify on un-aquired lock")
  252.         __waiters = self.__waiters
  253.         waiters = __waiters[:n]
  254.         if not waiters:
  255.             if __debug__:
  256.                 self._note("%s.notify(): no waiters", self)
  257.             return
  258.         self._note("%s.notify(): notifying %d waiter%s", self, n,
  259.                    n!=1 and "s" or "")
  260.         for waiter in waiters:
  261.             waiter.release()
  262.             try:
  263.                 __waiters.remove(waiter)
  264.             except ValueError:
  265.                 pass
  266.  
  267.     def notifyAll(self):
  268.         self.notify(len(self.__waiters))
  269.  
  270.  
  271. def Semaphore(*args, **kwargs):
  272.     return _Semaphore(*args, **kwargs)
  273.  
  274. class _Semaphore(_Verbose):
  275.  
  276.     # After Tim Peters' semaphore class, but not quite the same (no maximum)
  277.  
  278.     def __init__(self, value=1, verbose=None):
  279.         if value < 0:
  280.             raise ValueError("semaphore initial value must be >= 0")
  281.         _Verbose.__init__(self, verbose)
  282.         self.__cond = Condition(Lock())
  283.         self.__value = value
  284.  
  285.     def acquire(self, blocking=1):
  286.         rc = False
  287.         self.__cond.acquire()
  288.         while self.__value == 0:
  289.             if not blocking:
  290.                 break
  291.             if __debug__:
  292.                 self._note("%s.acquire(%s): blocked waiting, value=%s",
  293.                            self, blocking, self.__value)
  294.             self.__cond.wait()
  295.         else:
  296.             self.__value = self.__value - 1
  297.             if __debug__:
  298.                 self._note("%s.acquire: success, value=%s",
  299.                            self, self.__value)
  300.             rc = True
  301.         self.__cond.release()
  302.         return rc
  303.  
  304.     __enter__ = acquire
  305.  
  306.     def release(self):
  307.         self.__cond.acquire()
  308.         self.__value = self.__value + 1
  309.         if __debug__:
  310.             self._note("%s.release: success, value=%s",
  311.                        self, self.__value)
  312.         self.__cond.notify()
  313.         self.__cond.release()
  314.  
  315.     def __exit__(self, t, v, tb):
  316.         self.release()
  317.  
  318.  
  319. def BoundedSemaphore(*args, **kwargs):
  320.     return _BoundedSemaphore(*args, **kwargs)
  321.  
  322. class _BoundedSemaphore(_Semaphore):
  323.     """Semaphore that checks that # releases is <= # acquires"""
  324.     def __init__(self, value=1, verbose=None):
  325.         _Semaphore.__init__(self, value, verbose)
  326.         self._initial_value = value
  327.  
  328.     def release(self):
  329.         if self._Semaphore__value >= self._initial_value:
  330.             raise ValueError, "Semaphore released too many times"
  331.         return _Semaphore.release(self)
  332.  
  333.  
  334. def Event(*args, **kwargs):
  335.     return _Event(*args, **kwargs)
  336.  
  337. class _Event(_Verbose):
  338.  
  339.     # After Tim Peters' event class (without is_posted())
  340.  
  341.     def __init__(self, verbose=None):
  342.         _Verbose.__init__(self, verbose)
  343.         self.__cond = Condition(Lock())
  344.         self.__flag = False
  345.  
  346.     def isSet(self):
  347.         return self.__flag
  348.  
  349.     def set(self):
  350.         self.__cond.acquire()
  351.         try:
  352.             self.__flag = True
  353.             self.__cond.notifyAll()
  354.         finally:
  355.             self.__cond.release()
  356.  
  357.     def clear(self):
  358.         self.__cond.acquire()
  359.         try:
  360.             self.__flag = False
  361.         finally:
  362.             self.__cond.release()
  363.  
  364.     def wait(self, timeout=None):
  365.         self.__cond.acquire()
  366.         try:
  367.             if not self.__flag:
  368.                 self.__cond.wait(timeout)
  369.         finally:
  370.             self.__cond.release()
  371.  
  372. # Helper to generate new thread names
  373. _counter = 0
  374. def _newname(template="Thread-%d"):
  375.     global _counter
  376.     _counter = _counter + 1
  377.     return template % _counter
  378.  
  379. # Active thread administration
  380. _active_limbo_lock = _allocate_lock()
  381. _active = {}    # maps thread id to Thread object
  382. _limbo = {}
  383.  
  384.  
  385. # Main class for threads
  386.  
  387. class Thread(_Verbose):
  388.  
  389.     __initialized = False
  390.     # Need to store a reference to sys.exc_info for printing
  391.     # out exceptions when a thread tries to use a global var. during interp.
  392.     # shutdown and thus raises an exception about trying to perform some
  393.     # operation on/with a NoneType
  394.     __exc_info = _sys.exc_info
  395.  
  396.     def __init__(self, group=None, target=None, name=None,
  397.                  args=(), kwargs=None, verbose=None):
  398.         assert group is None, "group argument must be None for now"
  399.         _Verbose.__init__(self, verbose)
  400.         if kwargs is None:
  401.             kwargs = {}
  402.         self.__target = target
  403.         self.__name = str(name or _newname())
  404.         self.__args = args
  405.         self.__kwargs = kwargs
  406.         self.__daemonic = self._set_daemon()
  407.         self.__started = False
  408.         self.__stopped = False
  409.         self.__block = Condition(Lock())
  410.         self.__initialized = True
  411.         # sys.stderr is not stored in the class like
  412.         # sys.exc_info since it can be changed between instances
  413.         self.__stderr = _sys.stderr
  414.  
  415.     def _set_daemon(self):
  416.         # Overridden in _MainThread and _DummyThread
  417.         return currentThread().isDaemon()
  418.  
  419.     def __repr__(self):
  420.         assert self.__initialized, "Thread.__init__() was not called"
  421.         status = "initial"
  422.         if self.__started:
  423.             status = "started"
  424.         if self.__stopped:
  425.             status = "stopped"
  426.         if self.__daemonic:
  427.             status = status + " daemon"
  428.         return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
  429.  
  430.     def start(self):
  431.         if not self.__initialized:
  432.             raise RuntimeError("thread.__init__() not called")
  433.         if self.__started:
  434.             raise RuntimeError("thread already started")
  435.         if __debug__:
  436.             self._note("%s.start(): starting thread", self)
  437.         _active_limbo_lock.acquire()
  438.         _limbo[self] = self
  439.         _active_limbo_lock.release()
  440.         _start_new_thread(self.__bootstrap, ())
  441.         self.__started = True
  442.         _sleep(0.000001)    # 1 usec, to let the thread run (Solaris hack)
  443.  
  444.     def run(self):
  445.         if self.__target:
  446.             self.__target(*self.__args, **self.__kwargs)
  447.  
  448.     def __bootstrap(self):
  449.         # Wrapper around the real bootstrap code that ignores
  450.         # exceptions during interpreter cleanup.  Those typically
  451.         # happen when a daemon thread wakes up at an unfortunate
  452.         # moment, finds the world around it destroyed, and raises some
  453.         # random exception *** while trying to report the exception in
  454.         # __bootstrap_inner() below ***.  Those random exceptions
  455.         # don't help anybody, and they confuse users, so we suppress
  456.         # them.  We suppress them only when it appears that the world
  457.         # indeed has already been destroyed, so that exceptions in
  458.         # __bootstrap_inner() during normal business hours are properly
  459.         # reported.  Also, we only suppress them for daemonic threads;
  460.         # if a non-daemonic encounters this, something else is wrong.
  461.         try:
  462.             self.__bootstrap_inner()
  463.         except:
  464.             if self.__daemonic and _sys is None:
  465.                 return
  466.             raise
  467.  
  468.     def __bootstrap_inner(self):
  469.         try:
  470.             self.__started = True
  471.             _active_limbo_lock.acquire()
  472.             _active[_get_ident()] = self
  473.             del _limbo[self]
  474.             _active_limbo_lock.release()
  475.             if __debug__:
  476.                 self._note("%s.__bootstrap(): thread started", self)
  477.  
  478.             if _trace_hook:
  479.                 self._note("%s.__bootstrap(): registering trace hook", self)
  480.                 _sys.settrace(_trace_hook)
  481.             if _profile_hook:
  482.                 self._note("%s.__bootstrap(): registering profile hook", self)
  483.                 _sys.setprofile(_profile_hook)
  484.  
  485.             try:
  486.                 self.run()
  487.             except SystemExit:
  488.                 if __debug__:
  489.                     self._note("%s.__bootstrap(): raised SystemExit", self)
  490.             except:
  491.                 if __debug__:
  492.                     self._note("%s.__bootstrap(): unhandled exception", self)
  493.                 # If sys.stderr is no more (most likely from interpreter
  494.                 # shutdown) use self.__stderr.  Otherwise still use sys (as in
  495.                 # _sys) in case sys.stderr was redefined since the creation of
  496.                 # self.
  497.                 if _sys:
  498.                     _sys.stderr.write("Exception in thread %s:\n%s\n" %
  499.                                       (self.getName(), _format_exc()))
  500.                 else:
  501.                     # Do the best job possible w/o a huge amt. of code to
  502.                     # approximate a traceback (code ideas from
  503.                     # Lib/traceback.py)
  504.                     exc_type, exc_value, exc_tb = self.__exc_info()
  505.                     try:
  506.                         print>>self.__stderr, (
  507.                             "Exception in thread " + self.getName() +
  508.                             " (most likely raised during interpreter shutdown):")
  509.                         print>>self.__stderr, (
  510.                             "Traceback (most recent call last):")
  511.                         while exc_tb:
  512.                             print>>self.__stderr, (
  513.                                 '  File "%s", line %s, in %s' %
  514.                                 (exc_tb.tb_frame.f_code.co_filename,
  515.                                     exc_tb.tb_lineno,
  516.                                     exc_tb.tb_frame.f_code.co_name))
  517.                             exc_tb = exc_tb.tb_next
  518.                         print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
  519.                     # Make sure that exc_tb gets deleted since it is a memory
  520.                     # hog; deleting everything else is just for thoroughness
  521.                     finally:
  522.                         del exc_type, exc_value, exc_tb
  523.             else:
  524.                 if __debug__:
  525.                     self._note("%s.__bootstrap(): normal return", self)
  526.         finally:
  527.             _active_limbo_lock.acquire()
  528.             try:
  529.                 self.__stop()
  530.                 try:
  531.                     # We don't call self.__delete() because it also
  532.                     # grabs _active_limbo_lock.
  533.                     del _active[_get_ident()]
  534.                 except:
  535.                     pass
  536.             finally:
  537.                 _active_limbo_lock.release()
  538.  
  539.     def __stop(self):
  540.         self.__block.acquire()
  541.         self.__stopped = True
  542.         self.__block.notifyAll()
  543.         self.__block.release()
  544.  
  545.     def __delete(self):
  546.         "Remove current thread from the dict of currently running threads."
  547.  
  548.         # Notes about running with dummy_thread:
  549.         #
  550.         # Must take care to not raise an exception if dummy_thread is being
  551.         # used (and thus this module is being used as an instance of
  552.         # dummy_threading).  dummy_thread.get_ident() always returns -1 since
  553.         # there is only one thread if dummy_thread is being used.  Thus
  554.         # len(_active) is always <= 1 here, and any Thread instance created
  555.         # overwrites the (if any) thread currently registered in _active.
  556.         #
  557.         # An instance of _MainThread is always created by 'threading'.  This
  558.         # gets overwritten the instant an instance of Thread is created; both
  559.         # threads return -1 from dummy_thread.get_ident() and thus have the
  560.         # same key in the dict.  So when the _MainThread instance created by
  561.         # 'threading' tries to clean itself up when atexit calls this method
  562.         # it gets a KeyError if another Thread instance was created.
  563.         #
  564.         # This all means that KeyError from trying to delete something from
  565.         # _active if dummy_threading is being used is a red herring.  But
  566.         # since it isn't if dummy_threading is *not* being used then don't
  567.         # hide the exception.
  568.  
  569.         _active_limbo_lock.acquire()
  570.         try:
  571.             try:
  572.                 del _active[_get_ident()]
  573.             except KeyError:
  574.                 if 'dummy_threading' not in _sys.modules:
  575.                     raise
  576.         finally:
  577.             _active_limbo_lock.release()
  578.  
  579.     def join(self, timeout=None):
  580.         if not self.__initialized:
  581.             raise RuntimeError("Thread.__init__() not called")
  582.         if not self.__started:
  583.             raise RuntimeError("cannot join thread before it is started")
  584.         if self is currentThread():
  585.             raise RuntimeError("cannot join current thread")
  586.  
  587.         if __debug__:
  588.             if not self.__stopped:
  589.                 self._note("%s.join(): waiting until thread stops", self)
  590.         self.__block.acquire()
  591.         try:
  592.             if timeout is None:
  593.                 while not self.__stopped:
  594.                     self.__block.wait()
  595.                 if __debug__:
  596.                     self._note("%s.join(): thread stopped", self)
  597.             else:
  598.                 deadline = _time() + timeout
  599.                 while not self.__stopped:
  600.                     delay = deadline - _time()
  601.                     if delay <= 0:
  602.                         if __debug__:
  603.                             self._note("%s.join(): timed out", self)
  604.                         break
  605.                     self.__block.wait(delay)
  606.                 else:
  607.                     if __debug__:
  608.                         self._note("%s.join(): thread stopped", self)
  609.         finally:
  610.             self.__block.release()
  611.  
  612.     def getName(self):
  613.         assert self.__initialized, "Thread.__init__() not called"
  614.         return self.__name
  615.  
  616.     def setName(self, name):
  617.         assert self.__initialized, "Thread.__init__() not called"
  618.         self.__name = str(name)
  619.  
  620.     def isAlive(self):
  621.         assert self.__initialized, "Thread.__init__() not called"
  622.         return self.__started and not self.__stopped
  623.  
  624.     def isDaemon(self):
  625.         assert self.__initialized, "Thread.__init__() not called"
  626.         return self.__daemonic
  627.  
  628.     def setDaemon(self, daemonic):
  629.         if not self.__initialized:
  630.             raise RuntimeError("Thread.__init__() not called")
  631.         if self.__started:
  632.             raise RuntimeError("cannot set daemon status of active thread");
  633.         self.__daemonic = daemonic
  634.  
  635. # The timer class was contributed by Itamar Shtull-Trauring
  636.  
  637. def Timer(*args, **kwargs):
  638.     return _Timer(*args, **kwargs)
  639.  
  640. class _Timer(Thread):
  641.     """Call a function after a specified number of seconds:
  642.  
  643.     t = Timer(30.0, f, args=[], kwargs={})
  644.     t.start()
  645.     t.cancel() # stop the timer's action if it's still waiting
  646.     """
  647.  
  648.     def __init__(self, interval, function, args=[], kwargs={}):
  649.         Thread.__init__(self)
  650.         self.interval = interval
  651.         self.function = function
  652.         self.args = args
  653.         self.kwargs = kwargs
  654.         self.finished = Event()
  655.  
  656.     def cancel(self):
  657.         """Stop the timer if it hasn't finished yet"""
  658.         self.finished.set()
  659.  
  660.     def run(self):
  661.         self.finished.wait(self.interval)
  662.         if not self.finished.isSet():
  663.             self.function(*self.args, **self.kwargs)
  664.         self.finished.set()
  665.  
  666. # Special thread class to represent the main thread
  667. # This is garbage collected through an exit handler
  668.  
  669. class _MainThread(Thread):
  670.  
  671.     def __init__(self):
  672.         Thread.__init__(self, name="MainThread")
  673.         self._Thread__started = True
  674.         _active_limbo_lock.acquire()
  675.         _active[_get_ident()] = self
  676.         _active_limbo_lock.release()
  677.  
  678.     def _set_daemon(self):
  679.         return False
  680.  
  681.     def _exitfunc(self):
  682.         self._Thread__stop()
  683.         t = _pickSomeNonDaemonThread()
  684.         if t:
  685.             if __debug__:
  686.                 self._note("%s: waiting for other threads", self)
  687.         while t:
  688.             t.join()
  689.             t = _pickSomeNonDaemonThread()
  690.         if __debug__:
  691.             self._note("%s: exiting", self)
  692.         self._Thread__delete()
  693.  
  694. def _pickSomeNonDaemonThread():
  695.     for t in enumerate():
  696.         if not t.isDaemon() and t.isAlive():
  697.             return t
  698.     return None
  699.  
  700.  
  701. # Dummy thread class to represent threads not started here.
  702. # These aren't garbage collected when they die, nor can they be waited for.
  703. # If they invoke anything in threading.py that calls currentThread(), they
  704. # leave an entry in the _active dict forever after.
  705. # Their purpose is to return *something* from currentThread().
  706. # They are marked as daemon threads so we won't wait for them
  707. # when we exit (conform previous semantics).
  708.  
  709. class _DummyThread(Thread):
  710.  
  711.     def __init__(self):
  712.         Thread.__init__(self, name=_newname("Dummy-%d"))
  713.  
  714.         # Thread.__block consumes an OS-level locking primitive, which
  715.         # can never be used by a _DummyThread.  Since a _DummyThread
  716.         # instance is immortal, that's bad, so release this resource.
  717.         del self._Thread__block
  718.  
  719.         self._Thread__started = True
  720.         _active_limbo_lock.acquire()
  721.         _active[_get_ident()] = self
  722.         _active_limbo_lock.release()
  723.  
  724.     def _set_daemon(self):
  725.         return True
  726.  
  727.     def join(self, timeout=None):
  728.         assert False, "cannot join a dummy thread"
  729.  
  730.  
  731. # Global API functions
  732.  
  733. def currentThread():
  734.     try:
  735.         return _active[_get_ident()]
  736.     except KeyError:
  737.         ##print "currentThread(): no current thread for", _get_ident()
  738.         return _DummyThread()
  739.  
  740. def activeCount():
  741.     _active_limbo_lock.acquire()
  742.     count = len(_active) + len(_limbo)
  743.     _active_limbo_lock.release()
  744.     return count
  745.  
  746. def enumerate():
  747.     _active_limbo_lock.acquire()
  748.     active = _active.values() + _limbo.values()
  749.     _active_limbo_lock.release()
  750.     return active
  751.  
  752. from thread import stack_size
  753.  
  754. # Create the main thread object,
  755. # and make it available for the interpreter
  756. # (Py_Main) as threading._shutdown.
  757.  
  758. _shutdown = _MainThread()._exitfunc
  759.  
  760. # get thread-local implementation, either from the thread
  761. # module, or from the python fallback
  762.  
  763. try:
  764.     from thread import _local as local
  765. except ImportError:
  766.     from _threading_local import local
  767.  
  768.  
  769. def _after_fork():
  770.     # This function is called by Python/ceval.c:PyEval_ReInitThreads which
  771.     # is called from PyOS_AfterFork.  Here we cleanup threading module state
  772.     # that should not exist after a fork.
  773.  
  774.     # Reset _active_limbo_lock, in case we forked while the lock was held
  775.     # by another (non-forked) thread.  http://bugs.python.org/issue874900
  776.     global _active_limbo_lock
  777.     _active_limbo_lock = _allocate_lock()
  778.  
  779.     # fork() only copied the current thread; clear references to others.
  780.     new_active = {}
  781.     current = currentThread()
  782.     _active_limbo_lock.acquire()
  783.     try:
  784.         for thread in _active.itervalues():
  785.             if thread is current:
  786.                 # There is only one active thread. We reset the ident to
  787.                 # its new value since it can have changed.
  788.                 ident = _get_ident()
  789.                 thread._Thread__ident = ident
  790.                 new_active[ident] = thread
  791.             else:
  792.                 # All the others are already stopped.
  793.                 # We don't call _Thread__stop() because it tries to acquire
  794.                 # thread._Thread__block which could also have been held while
  795.                 # we forked.
  796.                 thread._Thread__stopped = True
  797.  
  798.         _limbo.clear()
  799.         _active.clear()
  800.         _active.update(new_active)
  801.         assert len(_active) == 1
  802.     finally:
  803.         _active_limbo_lock.release()
  804.  
  805.  
  806. # Self-test code
  807.  
  808. def _test():
  809.  
  810.     class BoundedQueue(_Verbose):
  811.  
  812.         def __init__(self, limit):
  813.             _Verbose.__init__(self)
  814.             self.mon = RLock()
  815.             self.rc = Condition(self.mon)
  816.             self.wc = Condition(self.mon)
  817.             self.limit = limit
  818.             self.queue = deque()
  819.  
  820.         def put(self, item):
  821.             self.mon.acquire()
  822.             while len(self.queue) >= self.limit:
  823.                 self._note("put(%s): queue full", item)
  824.                 self.wc.wait()
  825.             self.queue.append(item)
  826.             self._note("put(%s): appended, length now %d",
  827.                        item, len(self.queue))
  828.             self.rc.notify()
  829.             self.mon.release()
  830.  
  831.         def get(self):
  832.             self.mon.acquire()
  833.             while not self.queue:
  834.                 self._note("get(): queue empty")
  835.                 self.rc.wait()
  836.             item = self.queue.popleft()
  837.             self._note("get(): got %s, %d left", item, len(self.queue))
  838.             self.wc.notify()
  839.             self.mon.release()
  840.             return item
  841.  
  842.     class ProducerThread(Thread):
  843.  
  844.         def __init__(self, queue, quota):
  845.             Thread.__init__(self, name="Producer")
  846.             self.queue = queue
  847.             self.quota = quota
  848.  
  849.         def run(self):
  850.             from random import random
  851.             counter = 0
  852.             while counter < self.quota:
  853.                 counter = counter + 1
  854.                 self.queue.put("%s.%d" % (self.getName(), counter))
  855.                 _sleep(random() * 0.00001)
  856.  
  857.  
  858.     class ConsumerThread(Thread):
  859.  
  860.         def __init__(self, queue, count):
  861.             Thread.__init__(self, name="Consumer")
  862.             self.queue = queue
  863.             self.count = count
  864.  
  865.         def run(self):
  866.             while self.count > 0:
  867.                 item = self.queue.get()
  868.                 print item
  869.                 self.count = self.count - 1
  870.  
  871.     NP = 3
  872.     QL = 4
  873.     NI = 5
  874.  
  875.     Q = BoundedQueue(QL)
  876.     P = []
  877.     for i in range(NP):
  878.         t = ProducerThread(Q, NI)
  879.         t.setName("Producer-%d" % (i+1))
  880.         P.append(t)
  881.     C = ConsumerThread(Q, NI*NP)
  882.     for t in P:
  883.         t.start()
  884.         _sleep(0.000001)
  885.     C.start()
  886.     for t in P:
  887.         t.join()
  888.     C.join()
  889.  
  890. if __name__ == '__main__':
  891.     _test()
  892.